package com.chenjl.trace.stream.test;

import com.chenjl.trace.stream.topology.SentencesRichSpout;
import com.chenjl.trace.stream.topology.WordCountRichBolt;
import com.chenjl.trace.stream.topology.WordSplitRichBolt;
import com.chenjl.trace.stream.util.Constant;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import lombok.extern.slf4j.Slf4j;
/**
 * 本地运行Topology
 * 2019-1-10 18:30:42
 * @author chenjinlong
 */
@Slf4j
public class TraceStreamApp {	
	
	public static void main(String[] args) throws InterruptedException {
		log.info("TraceStreamApp begin~~~");
		
		TopologyBuilder topologyBuilder = new TopologyBuilder();
		topologyBuilder.setSpout(Constant.SENTENCESRICHSPOUT_ID, new SentencesRichSpout(),Constant.spoutParal);
		topologyBuilder.setBolt(Constant.WORDSPLITRICHBOLT_ID, new WordSplitRichBolt(),Constant.boltParal).localOrShuffleGrouping(Constant.SENTENCESRICHSPOUT_ID);
		topologyBuilder.setBolt(Constant.WORDCOUNTRICHBOLT_ID, new WordCountRichBolt(),Constant.boltParal).localOrShuffleGrouping(Constant.WORDSPLITRICHBOLT_ID);
		
        Config config = new Config();
        config.setNumWorkers(Constant.WORKER_NUM);
        config.setDebug(true);
        
		//本地模式
        LocalCluster localCluster = new LocalCluster();
        localCluster.submitTopology(Constant.TOPOLOGY_NAME,config,topologyBuilder.createTopology());
        
        localCluster.killTopology(Constant.TOPOLOGY_NAME);
        localCluster.shutdown();
        log.info("TraceStreamApp end~~~");
	}
}